Skip to content

[FLINK-38930][checkpoint] Filtering record before processing without spilling strategy#27783

Open
1996fanrui wants to merge 6 commits intoapache:masterfrom
1996fanrui:38930/filtering-record
Open

[FLINK-38930][checkpoint] Filtering record before processing without spilling strategy#27783
1996fanrui wants to merge 6 commits intoapache:masterfrom
1996fanrui:38930/filtering-record

Conversation

@1996fanrui
Copy link
Copy Markdown
Member

@1996fanrui 1996fanrui commented Mar 18, 2026

This PR depends on #27782

What is the purpose of the change

[FLINK-38930][checkpoint] Filtering record before processing without spilling strategy

Brief change log

Core filtering mechanism for recovered channel state buffers:

  • ChannelStateFilteringHandler with per-gate GateFilterHandler
  • RecordFilterContext with VirtualChannelRecordFilterFactory
  • Partial data check in SequentialChannelStateReaderImpl
  • Fix RecordFilterContext for Union downscale scenario

Verifying this change

  • Tons of unit tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive):no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector:no

Documentation

  • Does this pull request introduce a new feature? no

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Mar 18, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@1996fanrui 1996fanrui force-pushed the 38930/filtering-record branch from 2b06750 to 997e3a3 Compare March 20, 2026 08:48
Copy link
Copy Markdown
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I've left a couple of comments from the first review pass

* Deserializes records from {@code sourceBuffer}, applies the virtual channel's record
* filter, and re-serializes the surviving records into new buffers.
*/
List<Buffer> filterAndRewrite(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you re-order methods in this class? Public first. Private either below all publics, or below the first usage?

Comment on lines +362 to +373
/**
* Filters a recovered buffer from the specified virtual channel, returning new buffers
* containing only the records that belong to the current subtask.
*
* @return filtered buffers, possibly empty if all records were filtered out.
*/
public List<Buffer> filterAndRewrite(
int gateIndex,
int oldSubtaskIndex,
int oldChannelIndex,
Buffer sourceBuffer,
BufferSupplier bufferSupplier)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it return List from one single sourceBuffer? Could you explain this in the java doc? And how many Buffers can that be? If a lot, shouldn't this be an Iterator?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code comment is udpated.

The List return can contain more than 1 buffer when a spanning record completes in this buffer — the deserializer caches partial data from previous buffers, so the output may include data not present in the current source buffer.

This is uncommon but possible with any spanning record. For this case, it will be covered by spilling logic if network pool is insufficient.

Comment on lines +139 to +140
// Extra retain: filterAndRewrite consumes one ref, caller's finally releases another.
buffer.retainBuffer();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it would be slightly cleaner to call buffer.retainBuffer from the outside, and contract would be then that this method always takes over ownership of this buffer.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed together with the ownership concern in comment https://github.com/apache/flink/pull/27783/changes#r2996388666. Removed retainBuffer() and the catch block entirely. The buffer now has a single clean owner per path: in the filtering path, the deserializer recycles the buffer when consumed; the finally uses a defensive isRecycled() check only for the edge case where an exception occurs before the deserializer takes the buffer (e.g., VirtualChannel lookup failure). Added a buffer lifecycle diagram in the javadoc covering all paths. No extra retain/recycle needed.

Comment on lines +151 to +155
} catch (Throwable t) {
// filterAndRewrite didn't consume the buffer, release the extra ref.
buffer.recycleBuffer();
throw t;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that's a bit strange? It sounds like it's not clear who is owner of this buffer? There should be clean owner that's always responsible for cleaning up, no matter what.

Comment on lines +106 to +118
List<StreamElement> filteredElements = new ArrayList<>();

while (true) {
DeserializationResult result = vc.getNextRecord(deserializationDelegate);
if (result.isFullRecord()) {
filteredElements.add(deserializationDelegate.getInstance());
}
if (result.isBufferConsumed()) {
break;
}
}

return serializeToBuffers(filteredElements, bufferSupplier);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto about List in List<StreamElement> filteredElements. It would be safer to be iterative. Current implementation risks OOMs if deserialised records are using more memory than the serialised records. This is not very common, but could happen.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 62d42c7

resultBuffers.add(currentBuffer.retainBuffer());
}
currentBuffer.recycleBuffer();
currentBuffer = bufferSupplier.requestBufferBlocking();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to block here? 🤔 Can this lead to deadlocks? I think we were discussing this, but AFAIR this code works differently to what we were discussing offline (either using unpooled buffer or create two different pools, or filter records in-place without requesting new buffer)?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

This is addressed in a follow-up commit in https://github.com/apache/flink/pull/27639/commits (FLINK-38544, f031ddf) by falling back to heap buffer when the buffer pool is insufficient.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to squash that commit here, to avoid merging broken code given that we already have some working fix for it?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in b12a097

@1996fanrui 1996fanrui force-pushed the 38930/filtering-record branch from 997e3a3 to db2565f Compare March 26, 2026 21:38
…spilling strategy

Core filtering mechanism for recovered channel state buffers:
- ChannelStateFilteringHandler with per-gate GateFilterHandler
- RecordFilterContext with VirtualChannelRecordFilterFactory
- Partial data check in SequentialChannelStateReaderImpl
- Fix RecordFilterContext for Union downscale scenario
@1996fanrui 1996fanrui force-pushed the 38930/filtering-record branch from db2565f to b12a097 Compare March 27, 2026 13:51
Copy link
Copy Markdown
Member Author

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @pnowojski for the review,

All comments sound make sense to me, I have addressed all of them.

Comment on lines +362 to +373
/**
* Filters a recovered buffer from the specified virtual channel, returning new buffers
* containing only the records that belong to the current subtask.
*
* @return filtered buffers, possibly empty if all records were filtered out.
*/
public List<Buffer> filterAndRewrite(
int gateIndex,
int oldSubtaskIndex,
int oldChannelIndex,
Buffer sourceBuffer,
BufferSupplier bufferSupplier)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code comment is udpated.

The List return can contain more than 1 buffer when a spanning record completes in this buffer — the deserializer caches partial data from previous buffers, so the output may include data not present in the current source buffer.

This is uncommon but possible with any spanning record. For this case, it will be covered by spilling logic if network pool is insufficient.

Comment on lines +139 to +140
// Extra retain: filterAndRewrite consumes one ref, caller's finally releases another.
buffer.retainBuffer();
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed together with the ownership concern in comment https://github.com/apache/flink/pull/27783/changes#r2996388666. Removed retainBuffer() and the catch block entirely. The buffer now has a single clean owner per path: in the filtering path, the deserializer recycles the buffer when consumed; the finally uses a defensive isRecycled() check only for the edge case where an exception occurs before the deserializer takes the buffer (e.g., VirtualChannel lookup failure). Added a buffer lifecycle diagram in the javadoc covering all paths. No extra retain/recycle needed.

Comment on lines +106 to +118
List<StreamElement> filteredElements = new ArrayList<>();

while (true) {
DeserializationResult result = vc.getNextRecord(deserializationDelegate);
if (result.isFullRecord()) {
filteredElements.add(deserializationDelegate.getInstance());
}
if (result.isBufferConsumed()) {
break;
}
}

return serializeToBuffers(filteredElements, bufferSupplier);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 62d42c7

resultBuffers.add(currentBuffer.retainBuffer());
}
currentBuffer.recycleBuffer();
currentBuffer = bufferSupplier.requestBufferBlocking();
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in b12a097

- Add javadoc for filterAndRewrite explaining spanning record multi-buffer output
- Move retainBuffer call to caller for clearer buffer ownership contract
- Implement Closeable for ChannelStateFilteringHandler
- Use try-with-resources in SequentialChannelStateReaderImpl
…during recovery

When unaligned checkpointing during recovery is enabled, use a heap
buffer as fallback instead of blocking on buffer pool, to avoid hanging
if the buffer pool is not yet available. When the feature is disabled,
the original blocking behavior is preserved.
@1996fanrui 1996fanrui force-pushed the 38930/filtering-record branch from b12a097 to 26602df Compare March 27, 2026 14:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants